package com.stay4it.rxjava;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.stay4it.rxjava.bean.Course;
import com.stay4it.rxjava.bean.Entity;
import com.stay4it.rxjava.bean.Student;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

/**
 * RxJava组合操作符 本节参考文章：http://blog.csdn.net/jdsjlzx/article/details/52415615
 */
public class RxJava06 {

	/**
	 * Merge 解释：将2-9个Observables合并到一个Observable中进行发射，合并后的数据可能会是交错（无序）的（如果想要没有交错，
	 * 可以使用concat操作符） merge还可以传递一个Observable列表List，数组
	 * 甚至是一个发射Observable序列的Observable，merge将合并它们的输出作为单个Observable的输出
	 */
	private static void test1() {
		Observable<String> letterObservable = Observable.just("A", "B", "C", "D", "E", "F", "G", "H");
		Observable<Integer> numberObservable = Observable.just(1, 2, 3, 4, 5);

		Observable.merge(letterObservable, numberObservable)
		.subscribe(new Action1<Serializable>() {

			@Override
			public void call(Serializable value) {
				System.out.println("value = " + value);
			}
		});
		
	}

	/**
	 * Zip 
	 * 结合两个或多个Observables发射的数据项，每个数据只能组合一次，而且都是有序的。
	 * 它只发射与发射数据项最少的那个Observable一样多的数据。
	 * 
	 * 应用场景参考：http://blog.csdn.net/jdsjlzx/article/details/51724087
	 */
	private static void test2() {
		Observable<String> letterObservable = Observable.just("A", "B", "C", "D", "E", "F", "G", "H");
		Observable<Integer> numberObservable = Observable.just(1, 2, 3, 4, 5);

		Observable.zip(letterObservable, numberObservable, new Func2<String, Integer, String>() {

			@Override
			public String call(String t1, Integer t2) {
				return t1 + t2;
			}
		}).subscribe(new Action1<String>() {

			@Override
			public void call(String value) {
				System.out.println("value = " + value);
			}
		});

	}

	/**
	 * CombineLatest
	 * 解释：combineLatest操作符把两个Observable产生的结果进行合并，合并的结果组成一个新的Observable。
	 * 这两个Observable中任意一个Observable产生的结果，都和另一个Observable最后产生的结果，按照一定的规则进行合并。
	 */
	private static void test3() {
		Observable<String> letterObservable = Observable.just("A", "B", "C");
		Observable<Integer> numberObservable = Observable.just(4, 5);

		//letterObservable numberObservable谁在前谁在后都会对执行结果会有影响
		Observable.combineLatest( numberObservable, letterObservable,new Func2<Integer, String,  String>() {

			@Override
			public String call(Integer t1, String t2) {
				System.out.println("combine  t1 = " + t1 + " | t2 = " + t2);// t1的值一开始就是letterObservable中的最后一个值
				return t1 + t2;
			}
		}).subscribe(new Action1<String>() {

			@Override
			public void call(String value) {
				System.out.println("value = " + value);
			}
		});

	}

	/**
	 * Join 类似于combineLatest操作符，但是join操作符可以控制每个Observable产生结果的生命周期，在每个结果的生命周期内，
	 * 可以与另一个Observable产 生的结果按照一定的规则进行合并
	 * 
	 * Join(Observable,Func1,Func1,Func2) 需要传递四个参数
	 * 
	 * join操作符的用法如下： observableA.join(observableB, observableA产生结果生命周期控制函数，
	 * observableB产生结果生命周期控制函数， observableA产生的结果与observableB产生的结果的合并规则）
	 * 
	 * 一句话概括：在observableA的生命周期内：observableB输出的数据项与observableA输出的数据项每个合并
	 * 
	 * test4() 没有任何合并结果输出 分析:同一线程:observableA的生命周期已经执行完了,observableB还没出来,所以合并不了
	 * 
	 */
	private static void test4() {

		Observable<Integer> observableA = Observable.range(1, 5);

		List<Integer> data = Arrays.asList(6, 7, 8, 9, 10);
		Observable<Integer> observableB = Observable.from(data);

		observableA.join(observableB, new Func1<Integer, Observable<Integer>>() {
			@Override
			public Observable<Integer> call(Integer value) {
				//return Observable.just(value);
				return Observable.just(value).delay(1, TimeUnit.SECONDS);
			}
		}, new Func1<Integer, Observable<Integer>>() {
			@Override
			public Observable<Integer> call(Integer value) {
				return Observable.just(value);
			}
		}, new Func2<Integer, Integer, Integer>() {
			@Override
			public Integer call(Integer value1, Integer value2) {
				System.out.println("left: " + value1 + "  right:" + value2);
				return value1 + value2;
			}
		}).subscribe(new Observer<Integer>() {
			@Override
			public void onCompleted() {
				System.out.println("onCompleted");
			}

			@Override
			public void onError(Throwable e) {
			}

			@Override
			public void onNext(Integer value) {
				System.out.println("onNext value = " + value);
			}
		});

	}

	/**
	 * switchOnNext操作符 解释：switchOnNext操作符是把一组Observable转换成一个Observable
	 * 
	 */
	private static void test5() {
		Observable.switchOnNext(Observable.just(Observable.range(0, 2), Observable.range(100, 3)))
				.subscribe(new Action1<Integer>() {
					@Override
					public void call(Integer integer) {
						System.out.println("onNext value = " + integer);
					}
				});

	}

	/**
	 * startWith操作符 解释：在源Observable输出之前插入指定数据项
	 * 
	 */
	private static void test6() {
		Observable.just(7, 8, 9).startWith(11, 12)
		.subscribe(new Observer<Integer>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted ");
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("onError " + e.toString());
			}

			@Override
			public void onNext(Integer t) {
				System.out.println("onNext t = " + t);
			}
		});

	}

	public static void main(String[] args) throws InterruptedException {
		test6();

	}

}
